/*
 * Copyright (c) 2020 - present, Inspur Genersoft Co., Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.iec.edp.caf.rpc.client;

import io.iec.edp.caf.boot.context.CAFContext;
import io.iec.edp.caf.common.JSONSerializer;
import io.iec.edp.caf.commons.core.SerializerFactory;
import io.iec.edp.caf.commons.core.enums.SerializeType;
import io.iec.edp.caf.commons.exception.CAFRuntimeException;
import io.iec.edp.caf.commons.exception.ExceptionLevel;
import io.iec.edp.caf.commons.exception.entity.DefaultExceptionProperties;
import io.iec.edp.caf.commons.exception.entity.ExceptionErrorCode;
import io.iec.edp.caf.commons.runtime.CafEnvironment;
import io.iec.edp.caf.commons.utils.SpringBeanUtils;
import io.iec.edp.caf.core.session.ICafSessionService;
import io.iec.edp.caf.logging.CommonConstant;
import io.iec.edp.caf.msu.client.exception.ServiceUnitNotFoundException;
import io.iec.edp.caf.rpc.api.channel.RpcAbstractChannel;
import io.iec.edp.caf.rpc.api.common.RpcChannelType;
import io.iec.edp.caf.rpc.api.entity.RpcReturnValueDefinition;
import io.iec.edp.caf.rpc.api.entity.RpcServiceMethodDefinition;
import io.iec.edp.caf.rpc.api.event.RpcClientEventBroker;
import io.iec.edp.caf.rpc.api.serialize.RpcSerializeUtil;
import io.iec.edp.caf.rpc.api.service.CAFRpcClient;
import io.iec.edp.caf.rpc.api.service.InternalServiceManageService;
import io.iec.edp.caf.rpc.api.service.RpcClient;
import io.iec.edp.caf.rpc.api.support.*;
import io.iec.edp.caf.rpc.api.utils.Validator;
import io.iec.edp.caf.rpc.client.local.RpcLocalChecker;
import io.iec.edp.caf.rpc.client.local.RpcLocalInvoker;
import io.iec.edp.caf.tenancy.api.ITenantRouteService;
import io.iec.edp.caf.tenancy.api.ITenantService;
import io.iec.edp.caf.tenancy.api.context.RequestTenantContextHolder;
import io.iec.edp.caf.tenancy.api.context.RequestTenantContextInfo;
import io.iec.edp.caf.tenancy.api.entity.TenancyMode;
import io.iec.edp.caf.tenancy.api.exception.TenantNotFoundException;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import org.springframework.util.StringUtils;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;

import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
import java.util.*;

import static java.util.stream.Collectors.toList;

@Slf4j
public class CAFRpcClientImpl implements CAFRpcClient {

    private final InternalServiceManageService management;

    private final RpcLocalInvoker rpcLocalInvoker;

    private final ITenantService tenantService;

    private final ITenantRouteService tenantRouteService;

    private String cookieName;

    private final RpcClientEventBroker clientEventBroker;

    public CAFRpcClientImpl(RpcClientEventBroker clientEventBroker,
                            InternalServiceManageService management,
                            RpcLocalInvoker rpcLocalInvoker,
                            ITenantService tenantService,
                            ITenantRouteService tenantRouteService) {
        this.clientEventBroker = clientEventBroker;
        this.management = management;
        this.rpcLocalInvoker = rpcLocalInvoker;
        this.cookieName = CafEnvironment.getEnvironment().getProperty("caf-security.general.cookie-name");
        this.tenantService = tenantService;
        this.tenantRouteService = tenantRouteService;
        if (this.cookieName == null) {
            this.cookieName = ConstanceVarible.COOKIE_SESSION_NAME;
        }
    }

    @SneakyThrows
    @Override
    public <T> T invoke(Class<T> t, String serviceId, String serviceUnitName, LinkedHashMap<String, Object> parameters,
                        RpcChannelType channelType, HashMap<String, String> context) {
        return this.invoke(new Type<>(t), serviceId, "latest", serviceUnitName, parameters, channelType,
                ConstanceVarible.RPC_FILTER_MODE_DEFAULT, context);
    }

    @SneakyThrows
    @Override
    public <T> T invoke(Class<T> t, String serviceId, String version, String serviceUnitName, LinkedHashMap<String, Object> parameters,
                        RpcChannelType channelType, String filterType, HashMap<String, String> context) {
        return this.invoke(new Type<>(t), serviceId, version, serviceUnitName, parameters, channelType, filterType, context);
    }

    @Override
    public <T> T invoke(Type<T> t, String serviceId, String serviceUnitName, LinkedHashMap<String, Object> parameters,
                        RpcChannelType channelType, HashMap<String, String> context) throws IllegalAccessException,
            InstantiationException, IOException, NoSuchMethodException, InvocationTargetException, ClassNotFoundException {
        return this.invoke(t, serviceId, "latest", serviceUnitName, parameters, channelType, ConstanceVarible.RPC_FILTER_MODE_DEFAULT,
                context);
    }

    @Override
    public String invoke(String serviceId, String version, String serviceUnitName, LinkedHashMap<String, Object> parameters,
                         RpcChannelType channelType, String filterType, HashMap<String, String> context) {
        return this.invoke(String.class, serviceId, version, serviceUnitName, parameters, channelType, filterType, context);
    }


    @Override
    public <T> T invoke(Type<T> t, String serviceId, String version, String serviceUnitName, LinkedHashMap<String, Object> parameters,
                        RpcChannelType channelType, String filterType, HashMap<String, String> context) throws IllegalAccessException,
            InstantiationException, IOException, NoSuchMethodException, InvocationTargetException, ClassNotFoundException {
        //兼容RPC1.0接口，后续可以基于RPC2.0的REST接口实现
        if (channelType != null && channelType.equals(RpcChannelType.REST)) {
            RpcClient rpcClient = SpringBeanUtils.getBean(RpcClient.class);
            assert rpcClient != null;
            return rpcClient.invoke(t, serviceId, serviceUnitName, parameters, context);
        }

        try {
            if (version == null || version.length() == 0) {
                version = "v1.0";
            }

            if (filterType == null || filterType.length() == 0) {
                filterType = ConstanceVarible.RPC_FILTER_MODE_DEFAULT;
            }

            var start = System.currentTimeMillis();
            var end = System.currentTimeMillis();

            //get cookies
            Cookie[] cks = this.getCookies();
            //check cookie only log warning not break invoke
            Validator.CheckCksSession(cks, serviceId);

            T result;
            //get rpc service definition from local memory variable
            RpcServiceMethodDefinition serviceMethodDefinition = this.management.getRpcMethodDefinition(serviceId);
            //get extend tenant id
            Integer targetTenantId = getTargetTenantId(serviceUnitName, context);

            //rpc service event context
            HashMap<String, String> eventContext = new HashMap<>();
            eventContext.put(ConstanceVarible.CURRENT_SERVICE_ID, serviceId);
            eventContext.put(ConstanceVarible.TARGET_SU, serviceUnitName);

            log.info("start rpc invoke，current serviceId：{}，current target su:{}", serviceId, serviceUnitName);
            Object retvalue = null;

            // is local invoke
            if (RpcLocalChecker.isLocalInvoke(serviceUnitName)) { // 判断是否为本地调用
                //local memory has no rpc service definition
                if (serviceMethodDefinition == null) {
                    log.error("can not find rpc service by serviceid:" + serviceId + "，can not local invoke,current su：" + serviceUnitName);
                    throw new CAFRuntimeException(DefaultExceptionProperties.SERVICE_UNIT,
                            DefaultExceptionProperties.RESOURCE_FILE,
                            ExceptionErrorCode.localServiceUnitNotFound,
                            new String[]{serviceId, serviceUnitName},
                            null, ExceptionLevel.Error, false);
                }

                eventContext.put(ConstanceVarible.IS_LOCAL, "true");

                if (log.isInfoEnabled()) {
                    log.info("rpc local invoke info：", JSONSerializer.serialize(serviceMethodDefinition));
                }

                //local invoke（bean first,then reflect）
                start = System.currentTimeMillis();
                result = this.rpcLocalInvoker.invokeLocalService(t.getRawType(), serviceId, serviceUnitName, parameters,
                        serviceMethodDefinition, targetTenantId, eventContext);
                end = System.currentTimeMillis();
                log.info("rpc local invoke client total time：{} ms", end - start);

//                RpcReturnValueDefinition returnValueDefinition = serviceMethodDefinition == null ? null : serviceMethodDefinition.getReturnInfo();
//                start = System.currentTimeMillis();
                //local invoke do not serialize again
//                retvalue = RpcSerializeUtil.deSerializeReturnValue(t, RpcSerializeUtil.serializeReturnValue(result, returnValueDefinition), serviceMethodDefinition);
                retvalue = result;
//                end =  System.currentTimeMillis();
                log.info("rpc local invoke client Deserialize time：{} ms", end - start);
            } else {
                channelType = channelType == null ? RpcChannelType.GRPC : channelType;
                filterType = filterType == null ? ConstanceVarible.RPC_FILTER_MODE_DEFAULT : filterType;
                //create rpc channel by protocol
                var channel = (RpcAbstractChannel) RpcChannelFactory.buildChannelClient(serviceUnitName, channelType, filterType, eventContext);
                assert channel != null;
                if (log.isInfoEnabled()) {
                    log.info("rpc remote address：{}", channel.getHost() + ":" + channel.getPort());
                }

                //prepare event args
                eventContext.put(ConstanceVarible.IS_LOCAL, "false");
                eventContext.put(ConstanceVarible.CURRENT_REMOTEBASE_URL, channel.getHost() + ":" + channel.getPort());
                eventContext.put(ConstanceVarible.CURRENT_SERVICE_INTERFACE, serviceMethodDefinition == null ? null : serviceMethodDefinition.getParentDefinition().getClassName());
                HashMap<String, Object> localDict = new HashMap<>();
                localDict.put(CommonConstant.LOG_MSU_ID, serviceUnitName);
                //before invoke event
                this.clientEventBroker.firePreRpcInvokeEvent(eventContext, parameters, localDict);
                String traceId = (String) localDict.get(CommonConstant.LOG_TRACE_ID);

                //prepare rpc context
                HashMap<String, String> gspContext = new HashMap<>();
                setContext(gspContext, cks, serviceUnitName, targetTenantId, eventContext);

                //add additional headers to channel
                Map<String, String> headers = new HashMap<>();
                headers.put(CommonConstant.TRACE_ID_HEADER, traceId);
                headers.put(ConstanceVarible.GSP_CONTEXT, JSONSerializer.serialize(gspContext));
                channel.addHeaders(headers);

                //in RPC2.0 use the protobuf serialize as default
                LinkedHashMap<String, String> params = RpcSerializeUtil.serializeParameter4RPC2(parameters, serviceMethodDefinition);

                start = System.currentTimeMillis();
                var returnString = channel.invokeRemoteService(serviceId, version, params, null);
                channel.stopClient();
                retvalue = RpcSerializeUtil.deSerializeReturnValue(t, returnString, serviceMethodDefinition);


                var serverContextString = RpcThreadCacheHolder.getValue(ConstanceVarible.GSP_RPC_SERVER_ENVENT);
                var serverContextDict = JSONSerializer.deserialize(serverContextString, LinkedHashMap.class);
                this.clientEventBroker.firePostRpcInvokeEvent(serverContextDict, retvalue, localDict);

                end = System.currentTimeMillis();

                if (log.isInfoEnabled()) {
                    log.info("rpc remote invoke client total time:{} ms", end - start);
                }

            }

            return (T) retvalue;
        } catch (Exception e) {
            log.debug("rpc client exception type:{}", e.getClass().getName());
            if (e instanceof ServiceUnitNotFoundException) {
                throw e;
            } else if (e instanceof CAFRuntimeException && ((CAFRuntimeException) e).isBizException()) {
                throw e;
            } else {
                throw new CAFRuntimeException(DefaultExceptionProperties.SERVICE_UNIT,
                        DefaultExceptionProperties.RESOURCE_FILE,
                        ExceptionErrorCode.rpcError,
                        new String[]{}, e, ExceptionLevel.Error, false);
            }
        } finally {
            RpcThreadCacheHolder.clear();
        }
    }


    @Override
    public InputStream invokeStream(String serviceId, String version, String serviceUnitName, LinkedHashMap<String, Object> parameters,
                                    RpcChannelType channelType, String filterType, HashMap<String, String> context) {
        throw new NotImplementedException();
    }

    @Override
    public <T> T invokeStream(InputStream stream, Type<T> t, String serviceId, String version, String serviceUnitName,
                              LinkedHashMap<String, Object> parameters, RpcChannelType channelType, String filterType,
                              HashMap<String, String> context) {
        throw new NotImplementedException();
    }

    @Override
    public void setTimeout(Integer timeout) {
        RpcTimeoutHolder.setTimeout(timeout);
    }

    /**
     * get cookie
     *
     * @return if the request has cookie and has caf session,then use the request cookie
     * if request has no cookie then use {@link CAFContext} session create a new cookie
     * @throws UnsupportedEncodingException
     */
    private Cookie[] getCookies() throws UnsupportedEncodingException {
        Cookie[] cks = null;
        boolean containSessionId = false;
        //get request cookies
        ServletRequestAttributes servletRequestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        var sessionid = "";
        if (servletRequestAttributes != null) {
            HttpServletRequest request = servletRequestAttributes.getRequest();
            cks = request.getCookies();
            //if cookie has session,then use cookie session
            if (cks != null && cks.length > 0) {
                for (Cookie cookie : cks) {
                    if (cookie.getName().equalsIgnoreCase(this.cookieName)) {
                        containSessionId = true;
                        sessionid = cookie.getValue();
                        break;
                    }
                }
            }
        }

        var sessionsrv = SpringBeanUtils.getBean(ICafSessionService.class);

        if (log.isInfoEnabled()) {
            log.info("rpc invoke requst has cookie:" + containSessionId);
            log.info("rpc get sessionid:" + (sessionid != null ? new String(Base64.getDecoder().decode(sessionid), StandardCharsets.UTF_8) : sessionid) + " from cookie");
            log.info("rpc get sessionid from cookie isExpired:" + (!"".equals(sessionid) ? sessionsrv.isExpired(new String(Base64.getDecoder().decode(sessionid), StandardCharsets.UTF_8)) : true));
            log.info("rpc get sessionid:" + CAFContext.current.getSessionId() + " from cafcontext");
            log.info("rpc get sessionid from cafcontext isExpired：" + ((CAFContext.current.getSessionId() != null && !"".equals(CAFContext.current.getSessionId())) ? sessionsrv.isExpired(CAFContext.current.getSessionId()) : true));
        }

        if (!containSessionId) {
            cks = new Cookie[1];
            cks[0] = new Cookie(cookieName, base64Encode(CAFContext.current.getSessionId()));
        }
        return cks;
    }

    private String base64Encode(String value) throws UnsupportedEncodingException {
        if (org.springframework.util.StringUtils.isEmpty(value)) {
            return "";
        }

        byte[] encodedCookieBytes = Base64.getEncoder().encode(value.getBytes("UTF-8"));
        return new String(encodedCookieBytes);
    }

    private void setContext(Map<String, String> gspContext, Cookie[] cks, String serviceUnitName, Integer tenantId,
                            Map<String, String> eventContext) {
        String sessionId = "";
        //set some context for early net core
        if (cks != null && cks.length > 0) {
            List<Cookie> ck = Arrays.stream(cks).filter(c -> c.getName().equalsIgnoreCase(ConstanceVarible.NET_SESSION_ID)).collect(toList());
            sessionId = (ck != null && ck.size() > 0) ? ck.get(0).getValue() : sessionId;
        }
        gspContext.put(ConstanceVarible.GSP_CONTEXT_ID, sessionId);
        gspContext.put(ConstanceVarible.GSP_RPC, "true");
        gspContext.put(ConstanceVarible.GSP_MSU, serviceUnitName);
        if (tenantId != null) {
            gspContext.put(ConstanceVarible.GSP_RPC_TENANT, tenantId.toString());
        }
        gspContext.put(ConstanceVarible.GSP_RPC_CLIENT_ENVENT, SerializerFactory.getSerializer(SerializeType.Json).serializeToString(eventContext));
    }


    /**
     * get extend tenantid to route
     * by implement {@link ITenantRouteService} interface
     *
     * @param msu     msu code
     * @param context extend parameters to choose route tenant
     * @return if {@link ITenantRouteService} implement it will reture the ITenantRouteService`s tenantid,if not implement
     * {@link ITenantRouteService} it will found tenantid in http header by CAFContextFilter,if both not found it
     * will return null
     */
    private Integer getTargetTenantId(String msu, HashMap<String, String> context) {

        if (this.tenantService.getTenancyMode() != TenancyMode.group)
            return null;

        context = context == null ? new HashMap<>() : context;
        String tenantIdString = context.get(ConstanceVarible.ROUTE_TENANTID);
        if (tenantIdString != null) {
            return Integer.valueOf(tenantIdString);
        } else {
            String tenantDim1 = context.get(ConstanceVarible.ROUTE_FIRSTDIMENSION_KEY);
            String tenantDim2 = context.get(ConstanceVarible.ROUTE_SECONDDIMENSION_KEY);
            if (!StringUtils.isEmpty(tenantDim1)) {
                try {
                    return tenantRouteService.route(msu, tenantDim1, tenantDim2);
                } catch (TenantNotFoundException e) {
                    log.error(String.format("tenant route failed su：%s first dimension：%s ,second dimension：%s",
                            msu, tenantDim1, tenantDim2));
                }
            }

            //get requset header tenant if set
            RequestTenantContextInfo contextInfo = RequestTenantContextHolder.get();
            return contextInfo == null ? null : contextInfo.getTenantId();
        }
    }
}
